---
title: How to run flows in local processes
sidebarTitle: Run Flows in Local Processes
description: Create a deployment for a flow by calling the `serve` method.
---

The simplest way to create a [deployment](/v3/deploy) for your flow is by calling its `serve` method.

## Serve a flow

The serve method creates a deployment for the flow and starts a long-running process 
that monitors for work from the Prefect server.
When work is found, it is executed within its own isolated subprocess.

```python hello_world.py
from prefect import flow


@flow(log_prints=True)
def hello_world(name: str = "world", goodbye: bool = False):
    print(f"Hello {name} from Prefect! 🤗")

    if goodbye:
        print(f"Goodbye {name}!")


if __name__ == "__main__":
    # creates a deployment and starts a long-running
    # process that listens for scheduled work
    hello_world.serve(name="my-first-deployment",
        tags=["onboarding"],
        parameters={"goodbye": True},
        interval=60
    )
```

This interface provides the configuration for a deployment (with no 
strong infrastructure requirements), such as:

- schedules
- event triggers
- metadata such as tags and description
- default parameter values

<Tip>
**Schedules are auto-paused on shutdown**

By default, stopping the process running `flow.serve` will pause the schedule 
for the deployment (if it has one).

When running this in environments where restarts are expected use the 

`pause_on_shutdown=False` flag to prevent this behavior:

```python
if __name__ == "__main__":
    hello_world.serve(
        name="my-first-deployment",
        tags=["onboarding"],
        parameters={"goodbye": True},
        pause_on_shutdown=False,
        interval=60
    )
```
</Tip>

## Additional serve options

The `serve` method on flows exposes many options for the deployment.
Here's how to use some of those options:

- `cron`: a keyword that allows you to set a cron string schedule for the deployment; see
[schedules](/v3/automate/add-schedules/) for more advanced scheduling options
- `tags`: a keyword that allows you to tag this deployment and its runs for bookkeeping and filtering purposes
- `description`: a keyword that allows you to document what this deployment does; by default the
description is set from the docstring of the flow function (if documented)
- `version`: a keyword that allows you to track changes to your deployment; uses a hash of the
file containing the flow by default; popular options include semver tags or git commit hashes
- `triggers`: a keyword that allows you to define a set of conditions for when the deployment should run; see
[triggers](/v3/concepts/event-triggers/) for more on Prefect Events concepts

Next, add these options to your deployment:

```python
if __name__ == "__main__":
    get_repo_info.serve(
        name="my-first-deployment",
        cron="* * * * *",
        tags=["testing", "tutorial"],
        description="Given a GitHub repository, logs repository statistics for that repo.",
        version="tutorial/deployments",
    )
```


<Tip>
**Triggers with `.serve`**

See this [example](/v3/how-to-guides/automations/chaining-deployments-with-events) that triggers downstream work on upstream events.
</Tip>

<Note>
**`serve()` is a long-running process**

To execute remotely triggered or scheduled runs, your script with `flow.serve` must be actively running.
Stop the script with `CTRL+C` and your schedule will automatically pause.
</Note>

## Serve multiple flows at once

Serve multiple flows with the same process using the `serve` utility along with the `to_deployment` method of flows:

```python serve_two_flows.py
import time
from prefect import flow, serve


@flow
def slow_flow(sleep: int = 60):
    "Sleepy flow - sleeps the provided amount of time (in seconds)."
    time.sleep(sleep)


@flow
def fast_flow():
    "Fastest flow this side of the Mississippi."
    return


if __name__ == "__main__":
    slow_deploy = slow_flow.to_deployment(name="sleeper", interval=45)
    fast_deploy = fast_flow.to_deployment(name="fast")
    serve(slow_deploy, fast_deploy)
```

The behavior and interfaces are identical to the single flow case.
A few things to note:

- the `flow.to_deployment` interface exposes the *exact same* options as `flow.serve`; this method 
produces a deployment object
- the deployments are only registered with the API once `serve(...)` is called
- when serving multiple deployments, the only requirement is that they share a Python environment; 
they can be executed and scheduled independently of each other

A few optional steps for exploration include:

- pause and unpause the schedule for the `"sleeper"` deployment
- use the UI to submit ad-hoc runs for the `"sleeper"` deployment with different values for `sleep`
- cancel an active run for the `"sleeper"` deployment from the UI

<Tip>
**Hybrid execution option**

Prefect's deployment interface allows you to choose a hybrid execution model.
Whether you use Prefect Cloud or self-host Prefect server, you can run workflows in the
environments best suited to their execution.
This model enables efficient use of your infrastructure resources while maintaining the privacy
of your code and data.
There is no ingress required.
Read more about our [hybrid model](https://www.prefect.io/security/overview/#hybrid-model).
</Tip>

## Serve instance methods

You can serve flow methods that are part of a class instance. This is useful when you want to configure a flow once at initialization time and reuse that configuration across all runs.

```python data_processor.py
from prefect import flow


class DataProcessor:
    """Processor configured at initialization time."""

    def __init__(self, environment: str):
        # Configuration is set once when the instance is created
        if environment == "prod":
            self.api_url = "https://api.example.com"
            self.batch_size = 1000
        else:
            self.api_url = "https://staging.example.com"
            self.batch_size = 100

    @flow(log_prints=True)
    def process_batch(self, batch_id: str):
        """Process a batch using the configured settings."""
        print(f"Processing batch {batch_id}")
        print(f"API URL: {self.api_url}")
        print(f"Batch size: {self.batch_size}")
        # processing logic here using self.api_url and self.batch_size


if __name__ == "__main__":
    # Create processor configured for staging environment
    processor = DataProcessor(environment="staging")
    # All flow runs will use the staging configuration
    processor.process_batch.serve(name="batch-processor")
```

The instance configuration (set during `__init__`) is available to all flow runs. This is useful for environment-specific settings, connection parameters, or any configuration that should be consistent across all runs of the deployment.

## Retrieve a flow from remote storage

Just like the `.deploy` method, the `flow.from_source` method is used to define how to retrieve the flow that you want to serve.


### `from_source`

The `flow.from_source` method on `Flow` objects requires a `source` and an `entrypoint`.


#### `source`
The `source` of your deployment can be:
- a path to a local directory such as `path/to/a/local/directory`
- a repository URL such as `https://github.com/org/repo.git`
- a `GitRepository` object that accepts
    - a repository URL
    - a reference to a branch, tag, or commit hash
    - `GitCredentials` for private repositories

#### `entrypoint`
A flow `entrypoint` is the path to the file where the flow is located within that `source`, in the form 

```python
{path}:{flow_name}
```

For example, the following code will load the `hello` flow from the `flows/hello_world.py` file in the `PrefectHQ/examples` repository:
```python load_from_url.py
from prefect import flow


my_flow = flow.from_source(
    source="https://github.com/PrefectHQ/examples.git",
    entrypoint="flows/hello_world.py:hello"
)


if __name__ == "__main__":
    my_flow()
```

```bash
16:40:33.818 | INFO    | prefect.engine - Created flow run 'muscular-perch' for flow 'hello'
16:40:34.048 | INFO    | Flow run 'muscular-perch' - Hello world!
16:40:34.706 | INFO    | Flow run 'muscular-perch' - Finished in state Completed()
```


For more ways to store and access flow code, see the [Retrieve code from storage page](/v3/deploy/infrastructure-concepts/store-flow-code).

<Tip>
**You can serve loaded flows**

You can serve a flow loaded from remote storage with the same [`serve`](#serve-a-flow) method as a local flow:

```python serve_loaded_flow.py
from prefect import flow


if __name__ == "__main__":
    flow.from_source(
        source="https://github.com/org/repo.git",
        entrypoint="flows.py:my_flow"
    ).serve(name="my-deployment")
```
</Tip>

### Remote storage polling

When you serve a flow loaded from remote storage, the serving process periodically polls your remote storage for updates to the flow's code. 
This pattern allows you to update your flow code without restarting the serving process.
Note that if you change metadata associated with your flow's deployment such as parameters, you _will_ need to restart the serve process.

## Further reading

- [Serve flows in a long-lived Docker container](/v3/deploy/static-infrastructure-examples/docker)
- [Work pools and deployments with dynamic infrastructure](/v3/deploy/infrastructure-concepts/work-pools)